package com.tcl.kafka.listener;

import com.alibaba.cloud.nacos.NacosConfigManager;
import com.alibaba.nacos.api.config.ConfigChangeEvent;
import com.alibaba.nacos.api.config.ConfigChangeItem;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.client.config.listener.impl.AbstractConfigChangeListener;
import com.tcl.kafka.config.KafkaStartPauseProperties;
import com.tcl.kafka.constant.StartPauseConstant;
import com.tcl.kafka.core.KafkaService;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.PostConstruct;
import java.util.Objects;

/**
 * nacos配置变更监听
 * @author liangxi.zeng
 */
@Slf4j
public class NacosConfigListener {

    private NacosConfigManager nacosConfigManager;

    private KafkaService kafkaService;

    private KafkaStartPauseProperties kafkaStartPauseProperties;

    public NacosConfigListener(NacosConfigManager nacosConfigManager,KafkaService kafkaService,KafkaStartPauseProperties kafkaStartPauseProperties) {
        this.nacosConfigManager = nacosConfigManager;
        this.kafkaService = kafkaService;
        this.kafkaStartPauseProperties = kafkaStartPauseProperties;
    }


    /**
     * nacos 配置文件监听
     * @throws NacosException
     */
    @PostConstruct
    private void reloadConfig() {
        try {
            nacosConfigManager.getConfigService().addListener(kafkaStartPauseProperties.getListenerDataId()
                    , kafkaStartPauseProperties.getListenerGroup()
                    , new AbstractConfigChangeListener() {
                @Override
                public void receiveConfigChange(final ConfigChangeEvent event) {
                    /**
                     * 暂停配置参数
                     */
                    ConfigChangeItem pauseListeners = event.getChangeItem(StartPauseConstant.LISTENER_PAUSE_PARAM);
                    /**
                     * 启动配置参数
                     */
                    ConfigChangeItem startListeners = event.getChangeItem(StartPauseConstant.LISTENER_START_PARAM);

                    if(Objects.nonNull(pauseListeners)) {
                        String listenerIds = pauseListeners.getNewValue();
                        log.info("配置暂停listener:{}",listenerIds);
                        kafkaService.pauseMultiByComma(listenerIds);
                    }

                    if(Objects.nonNull(startListeners)) {
                        String listenerIds = startListeners.getNewValue();
                        log.info("配置启动listener:{}",listenerIds);
                        kafkaService.startMultiByComma(listenerIds);
                    }
                }
            });
        } catch (NacosException e) {
           log.error("监听kafka启停参数异常",e);
        }
    }


}
